package com.youlai.rocketmq.producer.tx;


import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author zc
 * @date 2022-10-27 00:56
 */
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor
public class TxProducerListener implements RocketMQLocalTransactionListener {


    /**
     * 记录各个事务Id的状态:1-正在执行，2-执行成功，3-执行失败
     */
    private ConcurrentHashMap<String, Integer> transMap = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Boolean isOpenTx = (Boolean) arg;
        if(isOpenTx){
            log.info("提交事务");
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            log.info("回滚事务");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    /**
     * 事务超时，回查方法
     * 检查本地事务,如果RocketMQ长时间(1分钟左右)没有收到本地事务的返回结果，则会定时主动执行改方法，查询本地事务执行情况。
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("检查事务");
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}
